package com.imooc.flink.course04

import java.util

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

import scala.collection.mutable.ArrayBuffer

/**
 * Flink广播变量
 * step1: 创建广播变量集合
 * step2: 使用广播变量
 * step3: 注册广播变量
 */
object BroadCast {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    // 1. The DataSet to be broadcast
    val toBroadCast = env.fromElements(1, 2, 3)

    val data = env.fromElements("a")

    data.map(new RichMapFunction[String, ArrayBuffer[Int]]() {
      var broadCastSet: Traversable[Int] = null

      override def open(parameters: Configuration): Unit = {
        import scala.collection.JavaConverters._
        // 3. Access the broadcast DataSet as a Collection
        broadCastSet = getRuntimeContext.getBroadcastVariable[Int]("broadCastSetName").asScala
      }

      override def map(in: String): ArrayBuffer[Int] = {
        var res = ArrayBuffer[Int]()
        for (broad <- broadCastSet) {
          res += broad
        }
        res
      }
    }).withBroadcastSet(toBroadCast, "broadCastSetName") // 2. Broadcast the DataSet
      .print()
  }
}
